package cn.com.lyb.flink.sink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;

public class SinkKafkaWithKeyDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 如果是精准一次，必须开启checkpoint
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        DataStreamSource<String> dataDS = env.socketTextStream("172.32.1.6", 1234);

        /**
         * Kafka Sink:
         * TODO 注意：如果要使用 精准一次 写入Kafka，需要满足以下条件，缺一不可
         * 1、开启checkpoint（后续介绍）
         * 2、设置事务前缀
         * 3、设置事务超时时间：   checkpoint间隔 <  事务超时时间  < max的15分钟
         */
        KafkaSink<String> kafkaSink = KafkaSink
                .<String>builder()
                .setBootstrapServers("172.32.1.7:9092")
                /**
                 * 如果要指定写入kafka的key，可以自定义序列化器：
                 * 1、实现 一个接口，重写 序列化 方法
                 * 2、指定key，转成 字节数组
                 * 3、指定value，转成 字节数组
                 * 4、返回一个 ProducerRecord对象，把topic、key、value放进去
                 */

                .setRecordSerializer(
                        new KafkaRecordSerializationSchema<String>() {
                            @Nullable
                            @Override
                            public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext kafkaSinkContext, Long aLong) {
                                String[] datas = element.split(",");
                                byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                byte[] bytes = element.getBytes(StandardCharsets.UTF_8);
                                return new ProducerRecord<>("canal_test", key, bytes);
                            }
                        }
                )
                // 写入kafka的一致性级别  精准一次，至少一次
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是选择精准一次，必须设置 事务前缀
                .setTransactionalIdPrefix("lyb-")
                // 如果是精准一次，必须设置 事务超时时间: 大于checkpoint间隔，小于 max 15分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
                .build();
        dataDS.sinkTo(kafkaSink);
// /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/kafka/bin
// ./kafka-console-consumer.sh --bootstrap-server 172.32.1.7:9092 --topic canal_test --from-beginning
        env.execute();
    }
}
